{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "\n",
    "pip install -q pandas==0.23.0\n",
    "pip install -q numpy==1.14.3\n",
    "pip install -q matplotlib==3.0.3\n",
    "pip install -q seaborn==0.8.1\n",
    "pip install -q PyAthena==1.8.0"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from datetime import datetime\n",
    "\n",
    "import matplotlib.pyplot as plt\n",
    "import numpy as np\n",
    "import pandas as pd\n",
    "import seaborn as sns\n",
    "\n",
    "from scipy.sparse import lil_matrix\n",
    "\n",
    "import boto3\n",
    "import botocore\n",
    "import sagemaker"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "session = boto3.session.Session()\n",
    "region_name = session.region_name\n",
    "\n",
    "sagemaker_session = sagemaker.Session()\n",
    "bucket = sagemaker_session.default_bucket()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Data Preparation"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create New Table with Sentiment\n",
    "We no longer need a separate table.\n",
    "\n",
    "This will be used by the model training."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This following query takes a long time (approx. 5min / scanning 50GB).\n",
    "Find the equivalent in the sql/ directory and run within Athena Console.\n",
    "\n",
    "**[TODO] Show screenshot of Athena console where to put in the query with link to github sql statement**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyathena import connect\n",
    "from pyathena.util import as_pandas\n",
    "\n",
    "database_name = 'dsoaws'\n",
    "table_name = 'amazon_reviews_parquet'\n",
    "sentiment_table_name = 'amazon_reviews_with_sentiment'\n",
    "table_prefix = 'amazon_reviews_with_sentiment'\n",
    "\n",
    "s3_staging_dir = 's3://{0}/athena/staging'.format(bucket)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cursor = connect(region_name=region_name, s3_staging_dir=s3_staging_dir).cursor()\n",
    "try:\n",
    "    cursor.execute('CREATE TABLE IF NOT EXISTS {} \\\n",
    "                WITH ( \\\n",
    "                  format = \\'PARQUET\\', \\\n",
    "                  external_location = \\'s3://{}/{}/\\', \\\n",
    "                      partitioned_by = ARRAY[\\'product_category\\'] \\\n",
    "                ) \\\n",
    "                AS \\\n",
    "                SELECT customer_id, \\\n",
    "                         review_id, \\\n",
    "                         product_id, \\\n",
    "                         product_title, \\\n",
    "                         review_headline, \\\n",
    "                         review_body, \\\n",
    "                         review_date, \\\n",
    "                         year, \\\n",
    "                         star_rating, \\\n",
    "                         CASE \\\n",
    "                             WHEN star_rating > 3 THEN 1 \\\n",
    "                             ELSE 0 \\\n",
    "                         END AS sentiment, \\\n",
    "                         product_category \\\n",
    "                FROM {}.{} \\\n",
    "                WHERE LENGTH(review_body) > 20'\n",
    "               .format(sentiment_table_name, bucket, table_prefix, database_name, table_name))\n",
    "except:\n",
    "    pass\n",
    " "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### View Sentiment"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This following query takes a long time... find the equivalent in the sql/ directory and run within Athena Console."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cursor = connect(region_name=region_name, s3_staging_dir=s3_staging_dir).cursor()\n",
    "\n",
    "cursor.execute('SELECT customer_id, \\\n",
    "                         review_id, \\\n",
    "                         product_id, \\\n",
    "                         product_title, \\\n",
    "                         review_headline, \\\n",
    "                         review_body, \\\n",
    "                         review_date, \\\n",
    "                         year, \\\n",
    "                         star_rating, \\\n",
    "                         sentiment, \\\n",
    "                         product_category \\\n",
    "                FROM {}.{} \\\n",
    "                WHERE product_category = \\'Digital_Video_Download\\' \\\n",
    "                ORDER BY review_id \\\n",
    "                LIMIT 500'\n",
    "               .format(database_name, sentiment_table_name))\n",
    "df = as_pandas(cursor)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df.head(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(df['sentiment'].value_counts())\n",
    "sns.countplot(x='sentiment', data=df)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Handling imbalanced datasets\n",
    "\n",
    "Here you can see we have a larger number of `positive` samples vs. `negative` ones. There are a number of techniques to blance this dataset out and the two most popular approaches are to either under-sample or over-sample. With under sampling you remove rows to balance the dataset out and in over sampling you can duplicate entries in the daatset which could lead to overfitting. This discussion is beyond the scope of this lab. You will under sample the data to balance the dataset but you can find more information [here]()."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sklearn.utils import resample\n",
    "\n",
    "positive = df[df['sentiment']==1]\n",
    "negative = df[df['sentiment']==0]\n",
    "\n",
    "positive_downsampled = resample(positive,\n",
    "                                replace = False, # sample without replacement\n",
    "                                n_samples = len(negative), # match minority n\n",
    "                                random_state = 27) # reproducible results\n",
    "\n",
    "# combine minority and downsampled majority\n",
    "df = pd.concat([positive_downsampled, negative])\n",
    "\n",
    "# checking counts\n",
    "print(df['sentiment'].value_counts())\n",
    "\n",
    "sns.countplot(x='sentiment', data=df)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create Test, Train, and Validation Datasets\n",
    "\n",
    "Depending on the framework you are leveraging in your AI/ML workloads you may decide to split the data into test, train, and validate splits before uploading to S3. You can leverage some built in functions in the sklearn package to do the split. To learn more about the sklearn framework click [here](https://scikit-learn.org/stable/)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sklearn.model_selection import train_test_split\n",
    "\n",
    "train, test = train_test_split(df, test_size=0.2, random_state=0)\n",
    "test, validate = train_test_split(test, test_size=0.5, random_state=0)\n",
    "\n",
    "print(f'Number of training examples: {len(train.index)}')\n",
    "print(f'Number of testing examples: {len(test.index)}')\n",
    "print(f'Number of validation examples: {len(validate.index)}')\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Visualize the Train, Test, and Validation Split"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Pie chart, where the slices will be ordered and plotted counter-clockwise:\n",
    "labels = ['Train', 'Validation', 'Test']\n",
    "sizes = [len(train.index), len(validate.index), len(test.index)]\n",
    "explode = (0.1, 0, 0)  \n",
    "\n",
    "fig1, ax1 = plt.subplots()\n",
    "\n",
    "ax1.pie(sizes, explode=explode, labels=labels, autopct='%1.1f%%', startangle=90)\n",
    "\n",
    "# Equal aspect ratio ensures that pie is drawn as a circle.\n",
    "ax1.axis('equal')  \n",
    "\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# df = df[['customer_id', 'product_id', 'product_title', 'star_rating', 'review_date']]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# df.head(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# df.shape"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "customers = df['customer_id'].value_counts()\n",
    "products = df['product_id'].value_counts()\n",
    "\n",
    "num_customers = customers.count()\n",
    "print(num_customers)\n",
    "num_products = products.count()\n",
    "print(num_products)\n",
    "num_features = num_customers + num_products\n",
    "print(num_features)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "TODO:  Filter out customers who haven't rated many movies"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#customers = customers[customers >= 5]\n",
    "#products = products[products >= 10]\n",
    "\n",
    "reduced_df = df.merge(pd.DataFrame({'customer_id': customers.index})).merge(pd.DataFrame({'product_id': products.index}))\n",
    "reduced_df"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a sequential index for customers and movies"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "customers = reduced_df['customer_id'].value_counts()\n",
    "products = reduced_df['product_id'].value_counts()\n",
    "\n",
    "#customers = df['customer_id'].value_counts()\n",
    "#products = df['product_id'].value_counts()\n",
    "\n",
    "print(customers)\n",
    "print(products)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Preserve the index for later use"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Balancing dataset\n",
    "[TODO] add this here"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "customer_index = pd.DataFrame({'customer_id': customers.index, \n",
    "                               'user': np.arange(customers.shape[0])})\n",
    "print(customer_index.shape)\n",
    "\n",
    "product_index = pd.DataFrame({'product_id': products.index, \n",
    "                              'item': np.arange(products.shape[0]) + customer_index.shape[0]})\n",
    "print(product_index.shape)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "reduced_df = reduced_df.merge(customer_index).merge(product_index)\n",
    "reduced_df.head()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Count days since first review (included as a feature to capture trend)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "reduced_df['review_date'] = pd.to_datetime(reduced_df['review_date'])\n",
    "customer_first_date = reduced_df.groupby('customer_id')['review_date'].min().reset_index()\n",
    "customer_first_date.columns = ['customer_id', 'first_review_date']"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "reduced_df = reduced_df.merge(customer_first_date)\n",
    "reduced_df['days_since_first'] = (reduced_df['review_date'] - reduced_df['first_review_date']).dt.days\n",
    "reduced_df['days_since_first'] = reduced_df['days_since_first'].fillna(0)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Split into train and test datasets"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "train_df = reduced_df.groupby('customer_id').last().reset_index()\n",
    "train_df"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "test_df = reduced_df.merge(train_df[['customer_id', 'product_id']], \n",
    "                            on=['customer_id', 'product_id'], \n",
    "                            how='outer', \n",
    "                            indicator=True)\n",
    "test_df = test_df[(test_df['_merge'] == 'left_only')]\n",
    "test_df"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "- Factorization machines expects data to look something like:\n",
    "  - Sparse matrix\n",
    "  - Target variable is that user's rating for a movie\n",
    "  - One-hot encoding for users ($N$ features)\n",
    "  - One-hot encoding for movies ($M$ features)\n",
    "\n",
    "|Rating|User1|User2|...|UserN|Movie1|Movie2|Movie3|...|MovieM|Feature1|Feature2|...|\n",
    "|---|---|---|---|---|---|---|---|---|---|---|---|---|\n",
    "|4|1|0|...|0|1|0|0|...|0|20|2.2|...|\n",
    "|5|1|0|...|0|0|1|0|...|0|17|9.1|...|\n",
    "|3|0|1|...|0|1|0|0|...|0|3|11.0|...|\n",
    "|4|0|1|...|0|0|0|1|...|0|15|6.4|...|"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from scipy.sparse import csr_matrix\n",
    "\n",
    "def to_csr_matrix(df, num_users, num_items):\n",
    "    feature_dim = num_users + num_items + 1\n",
    "    data = np.concatenate([np.array([1] * df.shape[0]),\n",
    "                           np.array([1] * df.shape[0]),\n",
    "                           df['days_since_first'].values])\n",
    "    row = np.concatenate([np.arange(df.shape[0])] * 3)\n",
    "    col = np.concatenate([df['user'].values,\n",
    "                          df['item'].values,\n",
    "                          np.array([feature_dim - 1] * df.shape[0])])\n",
    "    return csr_matrix((data, (row, col)), \n",
    "                      shape=(df.shape[0], feature_dim), \n",
    "                      dtype=np.float32)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "train_csr = to_csr_matrix(train_df, customer_index.shape[0], product_index.shape[0])\n",
    "print(train_csr)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "test_csr = to_csr_matrix(test_df, customer_index.shape[0], product_index.shape[0])\n",
    "print(test_csr)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Convert to sparse recordIO-wrapped protobuf that SageMaker factorization machines expects"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import io\n",
    "import sagemaker.amazon.common as smac\n",
    "\n",
    "fm_prefix = 'factorization-machines'\n",
    "\n",
    "def upload_to_s3_as_protobuf(csr, label, bucket, prefix, channel, splits):\n",
    "    indices = np.array_split(np.arange(csr.shape[0]), splits)\n",
    "    for i in range(len(indices)):\n",
    "        index = indices[i]\n",
    "        buf = io.BytesIO()\n",
    "        smac.write_spmatrix_to_sparse_tensor(buf, csr[index, ], label[index])\n",
    "        buf.seek(0)\n",
    "\n",
    "        boto3.client('s3').upload_fileobj(buf, bucket, '{}/{}/data-{}'.format(prefix, channel, i))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "upload_to_s3_as_protobuf(train_csr, train_df['star_rating'].values.astype(np.float32), bucket, fm_prefix, channel='train', splits=10)\n",
    "upload_to_s3_as_protobuf(test_csr, test_df['star_rating'].values.astype(np.float32), bucket, fm_prefix, channel='test', splits=1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_python3",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
